from pykafka import KafkaClient

client = KafkaClient(hosts="127.0.0.1:9092")


def start_consumer(topic, func):
    print("consumer start")
    topic = client.topics[topic]
    con = topic.get_balanced_consumer(" ", auto_commit_enable=True)
    for cur_mag in con:
        if cur_mag is not None:
            func(cur_mag.value)


def start_producer(topic, ite, func):
    print("producer start")
    topic = client.topics[topic]
    pro = topic.get_sync_producer()
    for cur_msg in ite:
        msg = func(cur_msg)
        pro.produce(msg)


def get_producer(topic):
    topic = client.topics[topic]
    pro = topic.get_sync_producer()
    return pro


def produce_one(pro, msg):
    pro.produce(msg)


if __name__ == "__main__":
    # start_producer("test", ["1", "2", "3"], lambda x: bytes(x, "utf-8"))
    start_consumer("test", print)
